/*

 * Licensed to the Apache Software Foundation (ASF) under one

 * or more contributor license agreements.  See the NOTICE file

 * distributed with this work for additional information

 * regarding copyright ownership.  The ASF licenses this file

 * to you under the Apache License, Version 2.0 (the

 * "License"); you may not use this file except in compliance

 * with the License.  You may obtain a copy of the License at

 *

 *     http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */

package com.bff.gaia.unified.runners.gaia.translation.wrappers;



import com.bff.gaia.unified.runners.core.construction.SerializablePipelineOptions;

import com.bff.gaia.unified.runners.gaia.metrics.GaiaMetricContainer;

import com.bff.gaia.unified.runners.gaia.metrics.ReaderInvocationUtil;

import com.bff.gaia.unified.sdk.io.BoundedSource;

import com.bff.gaia.unified.sdk.io.Source;

import com.bff.gaia.unified.sdk.options.PipelineOptions;

import com.bff.gaia.unified.sdk.transforms.windowing.GlobalWindow;

import com.bff.gaia.unified.sdk.transforms.windowing.PaneInfo;

import com.bff.gaia.unified.sdk.util.WindowedValue;

import com.bff.gaia.api.common.io.DefaultInputSplitAssigner;

import com.bff.gaia.api.common.io.InputFormat;

import com.bff.gaia.api.common.io.RichInputFormat;

import com.bff.gaia.api.common.io.statistics.BaseStatistics;

import com.bff.gaia.configuration.Configuration;

import com.bff.gaia.core.io.InputSplitAssigner;

import org.joda.time.Instant;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;



import java.io.IOException;

import java.util.List;



/** Wrapper for executing a {@link Source} as a Gaia {@link InputFormat}. */

public class SourceInputFormat<T> extends RichInputFormat<WindowedValue<T>, SourceInputSplit<T>> {

  private static final Logger LOG = LoggerFactory.getLogger(SourceInputFormat.class);



  private final String stepName;

  private final BoundedSource<T> initialSource;



  private transient PipelineOptions options;

  private final SerializablePipelineOptions serializedOptions;



  private transient BoundedSource.BoundedReader<T> reader;

  private boolean inputAvailable = false;



  private transient ReaderInvocationUtil<T, BoundedSource.BoundedReader<T>> readerInvoker;



  public SourceInputFormat(

      String stepName, BoundedSource<T> initialSource, PipelineOptions options) {

    this.stepName = stepName;

    this.initialSource = initialSource;

    this.serializedOptions = new SerializablePipelineOptions(options);

  }



  @Override

  public void configure(Configuration configuration) {

    options = serializedOptions.get();

  }



  @Override

  public void open(SourceInputSplit<T> sourceInputSplit) throws IOException {

    GaiaMetricContainer metricContainer = new GaiaMetricContainer(getRuntimeContext());



    readerInvoker = new ReaderInvocationUtil<>(stepName, serializedOptions.get(), metricContainer);



    reader = ((BoundedSource<T>) sourceInputSplit.getSource()).createReader(options);

    inputAvailable = readerInvoker.invokeStart(reader);

  }



  @Override

  public BaseStatistics getStatistics(BaseStatistics baseStatistics) throws IOException {

    try {

      final long estimatedSize = initialSource.getEstimatedSizeBytes(options);



      return new BaseStatistics() {

        @Override

        public long getTotalInputSize() {

          return estimatedSize;

        }



        @Override

        public long getNumberOfRecords() {

          return BaseStatistics.NUM_RECORDS_UNKNOWN;

        }



        @Override

        public float getAverageRecordWidth() {

          return BaseStatistics.AVG_RECORD_BYTES_UNKNOWN;

        }

      };

    } catch (Exception e) {

      LOG.warn("Could not read Source statistics: {}", e);

    }



    return null;

  }



  @Override

  @SuppressWarnings("unchecked")

  public SourceInputSplit<T>[] createInputSplits(int numSplits) throws IOException {

    try {

      long desiredSizeBytes = initialSource.getEstimatedSizeBytes(options) / numSplits;

      List<? extends Source<T>> shards = initialSource.split(desiredSizeBytes, options);

      int numShards = shards.size();

      SourceInputSplit<T>[] sourceInputSplits = new SourceInputSplit[numShards];

      for (int i = 0; i < numShards; i++) {

        sourceInputSplits[i] = new SourceInputSplit<>(shards.get(i), i);

      }

      return sourceInputSplits;

    } catch (Exception e) {

      throw new IOException("Could not create input splits from Source.", e);

    }

  }



  @Override

  public InputSplitAssigner getInputSplitAssigner(final SourceInputSplit[] sourceInputSplits) {

    return new DefaultInputSplitAssigner(sourceInputSplits);

  }



  @Override

  public boolean reachedEnd() throws IOException {

    return !inputAvailable;

  }



  @Override

  public WindowedValue<T> nextRecord(WindowedValue<T> t) throws IOException {

    if (inputAvailable) {

      final T current = reader.getCurrent();

      final Instant timestamp = reader.getCurrentTimestamp();

      // advance reader to have a record ready next time

      inputAvailable = readerInvoker.invokeAdvance(reader);

      return WindowedValue.of(current, timestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING);

    }



    return null;

  }



  @Override

  public void close() throws IOException {

    // TODO null check can be removed once GAIA-3796 is fixed

    if (reader != null) {

      reader.close();

    }

  }

}